In [1]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Use the BigQuery storage connector and Spark MLlib to build a Linear Regression model and make predictions
Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.
In [2]:
!scala -version
In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('1.3. BigQuery Storage & Spark MLlib - Python')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.getOrCreate()
spark.version
Out[3]:
In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
In [5]:
table = 'bigquery-public-data.samples.natality'
df_natality_table = spark.read \
.format("bigquery") \
.option("table", table) \
.load()
In [6]:
limit = 10000
df_natality_select = df_natality_table \
.select("weight_pounds", "mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min") \
.where("""
weight_pounds IS NOT NULL
AND mother_age IS NOT NULL
AND father_age IS NOT NULL
AND gestation_weeks IS NOT NULL
AND weight_gain_pounds IS NOT NULL
AND apgar_5min IS NOT NULL
""") \
.limit(limit) \
.cache()
df_natality_select.printSchema()
In [7]:
df_natality_select.count()
Out[7]:
Spark MLlib estimators expect a single vector column for features. Multiple columns can be converted to a single vector column using VectorAssembler
In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["mother_age", "father_age", "gestation_weeks", "weight_gain_pounds", "apgar_5min"],
outputCol="features")
df_assembler_output = assembler.transform(df_natality_select)
df_assembler_output
Out[8]:
In [9]:
df_training_data = df_assembler_output \
.select("features", "weight_pounds") \
.withColumnRenamed("weight_pounds","label")
df_training_data.cache()
df_training_data
Out[9]:
In [10]:
(df_training, df_test) = df_training_data.randomSplit([0.7, 0.3])
Import and use and the LinearRegression model
In [11]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
model = lr.fit(df_training)
In [12]:
print("Coefficients:" + str(model.coefficients))
print("Intercept:" + str(model.intercept))
In [13]:
trainingSummary = model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
trainingSummary.residuals
Out[13]:
In [15]:
predictions = model.transform(df_test)
predictions.select("prediction", "label", "features")
Out[15]:
In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)